{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Pandas API in Apache Beam\n",
    "\n",
    "Apache Beam 2.26 onwards supports the Pandas API. This makes it very convenient to write complex pipelines, and execute them at scale, or in a streaming manner."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[31mERROR: After October 2020 you may experience errors when installing or updating packages. This is because pip will change the way that it resolves dependency conflicts.\n",
      "\n",
      "We recommend you use --use-feature=2020-resolver to test your packages with the new resolver before it becomes the default.\n",
      "\n",
      "tfx 0.23.0 requires attrs<20,>=19.3.0, but you'll have attrs 20.3.0 which is incompatible.\n",
      "tfx 0.23.0 requires google-resumable-media<0.7.0,>=0.6.0, but you'll have google-resumable-media 1.1.0 which is incompatible.\n",
      "tfx 0.23.0 requires kubernetes<12,>=10.0.1, but you'll have kubernetes 12.0.0 which is incompatible.\n",
      "tensorflow-data-validation 0.23.1 requires joblib<0.15,>=0.12, but you'll have joblib 0.17.0 which is incompatible.\u001b[0m\n",
      "Note: you may need to restart the kernel to use updated packages.\n"
     ]
    }
   ],
   "source": [
    "%pip install --quiet apache-beam[gcp]==2.26.0"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2.26.0\n"
     ]
    }
   ],
   "source": [
    "import apache_beam as beam\n",
    "import pandas as pd\n",
    "print(beam.__version__)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1. Experiment on one day of flights data\n",
    "\n",
    "Let's pull out one day of data using BigQuery and display the stats we need."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bigquery df\n",
    "SELECT  \n",
    "  airline,\n",
    "  departure_airport,\n",
    "  arrival_airport,\n",
    "  departure_delay,\n",
    "  arrival_delay\n",
    "FROM `bigquery-samples.airline_ontime_data.flights`\n",
    "WHERE date = '2006-08-20'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>airline</th>\n",
       "      <th>departure_airport</th>\n",
       "      <th>arrival_airport</th>\n",
       "      <th>departure_delay</th>\n",
       "      <th>arrival_delay</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>AS</td>\n",
       "      <td>ANC</td>\n",
       "      <td>DEN</td>\n",
       "      <td>-1.0</td>\n",
       "      <td>-12.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>F9</td>\n",
       "      <td>ANC</td>\n",
       "      <td>DEN</td>\n",
       "      <td>0.0</td>\n",
       "      <td>0.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>UA</td>\n",
       "      <td>ANC</td>\n",
       "      <td>DEN</td>\n",
       "      <td>-7.0</td>\n",
       "      <td>14.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>F9</td>\n",
       "      <td>PHX</td>\n",
       "      <td>DEN</td>\n",
       "      <td>14.0</td>\n",
       "      <td>8.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>F9</td>\n",
       "      <td>PHX</td>\n",
       "      <td>DEN</td>\n",
       "      <td>-2.0</td>\n",
       "      <td>4.0</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "  airline departure_airport arrival_airport  departure_delay  arrival_delay\n",
       "0      AS               ANC             DEN             -1.0          -12.0\n",
       "1      F9               ANC             DEN              0.0            0.0\n",
       "2      UA               ANC             DEN             -7.0           14.0\n",
       "3      F9               PHX             DEN             14.0            8.0\n",
       "4      F9               PHX             DEN             -2.0            4.0"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DFW    447\n",
       "ORD    214\n",
       "MIA    112\n",
       "LAX     86\n",
       "LGA     59\n",
       "Name: airport, dtype: int64"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# most frequent airports used by each carrier\n",
    "aa = df.groupby('airline').get_group('AA')\n",
    "\n",
    "arr = aa.rename(columns={'arrival_airport': 'airport'}).airport.value_counts()\n",
    "arr.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DFW    450\n",
       "ORD    213\n",
       "MIA    112\n",
       "LAX     86\n",
       "LGA     57\n",
       "Name: airport, dtype: int64"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "dep = aa.rename(columns={'departure_airport': 'airport'}).airport.value_counts()\n",
    "dep.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "array(['DFW', 'ORD', 'MIA', 'LAX', 'LGA', 'STL', 'SJU', 'BOS', 'SFO',\n",
       "       'JFK'], dtype=object)"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "total = arr + dep\n",
    "top_airports = total.nlargest(10)\n",
    "top_airports.index.values"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "departure_delay    4.533036\n",
       "arrival_delay      1.350000\n",
       "dtype: float64"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "means = aa[aa['arrival_airport'].isin(top_airports.index.values)].mean()\n",
    "means"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'airline': 'AA', 'departure_delay': 4.533035714285714, 'arrival_delay': 1.35}\n"
     ]
    }
   ],
   "source": [
    "print({\n",
    "    'airline': aa.airline.iloc[0],\n",
    "    'departure_delay': means['departure_delay'],\n",
    "    'arrival_delay': means['arrival_delay'],\n",
    "})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2. Combine the Pandas code into functions\n",
    "\n",
    "Make functions out of the Pandas code so that it is repeatable"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "airline\n",
       "AA      4.533036,1.350000\n",
       "AS    21.035503,22.032544\n",
       "B6      6.363905,5.890533\n",
       "CO      7.312169,2.442681\n",
       "DL    13.549398,17.514458\n",
       "EV    22.855721,26.079602\n",
       "F9      3.686170,3.882979\n",
       "FL    20.049217,23.006711\n",
       "HA    -3.890411,-3.383562\n",
       "KH    -1.608392,-1.552448\n",
       "MQ      5.169554,3.707921\n",
       "NW      4.281379,2.769655\n",
       "OH      4.923445,6.473684\n",
       "OO      9.402878,8.790168\n",
       "TZ     9.333333,11.588235\n",
       "UA      6.776151,3.776151\n",
       "US      4.423826,1.066438\n",
       "WN     4.107368,-2.775439\n",
       "XE      3.317007,2.081633\n",
       "YV    15.212632,10.185263\n",
       "dtype: object"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# do this for all the carriers\n",
    "def get_delay_at_top_airports(aa):\n",
    "    arr = aa.rename(columns={'arrival_airport': 'airport'}).airport.value_counts()\n",
    "    dep = aa.rename(columns={'departure_airport': 'airport'}).airport.value_counts()\n",
    "    total = arr + dep\n",
    "    top_airports = total.nlargest(10)\n",
    "    means = aa[aa['arrival_airport'].isin(top_airports.index.values)].mean()\n",
    "    return '{:2f},{:2f}'.format(\n",
    "        means['departure_delay'], means['arrival_delay'])\n",
    "\n",
    "df.groupby('airline').apply(get_delay_at_top_airports)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3. Productionize pipeline using Apache Beam on Dataflow\n",
    "\n",
    "Apache Beam lets you run batch and streaming pipelines at scale and in resilient way.\n",
    "To do this, build a pipeline.\n",
    "* Do it on full dataset (batch)\n",
    "* Do it on streaming data by adding a Sliding or Fixed Time Window to process daily/hourly/minute-by-minute data as it comes in."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1156032000.0\n"
     ]
    }
   ],
   "source": [
    "import time\n",
    "import datetime\n",
    "def to_unixtime(s):\n",
    "    return time.mktime(datetime.datetime.strptime(s, \"%Y-%m-%d\").timetuple())\n",
    "print(to_unixtime('2006-08-20'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "from apache_beam.dataframe.convert import to_dataframe, to_pcollection"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "ename": "NotImplementedError",
     "evalue": "'apply' is not yet supported (BEAM-9547)",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[0;31mNotImplementedError\u001b[0m                       Traceback (most recent call last)",
      "\u001b[0;32m<ipython-input-20-4f9aa53afce1>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m     29\u001b[0m     \u001b[0;31m# get dataframes corresponding to each group, and apply our function to it\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     30\u001b[0m     \u001b[0;31m#result = agg | 'avg delays' >> beam.Map(lambda pc: to_dataframe(pc).get_delay_at_top_airports)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 31\u001b[0;31m     \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mgrouped\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mapply\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mget_delay_at_top_airports\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m     32\u001b[0m     \u001b[0mresult\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mto_csv\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'output.csv'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/opt/conda/lib/python3.7/site-packages/apache_beam/dataframe/frame_base.py\u001b[0m in \u001b[0;36mwrapper\u001b[0;34m(self, *args, **kwargs)\u001b[0m\n\u001b[1;32m    316\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mnot_implemented_method\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mop\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mjira\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m'BEAM-9547'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    317\u001b[0m   \u001b[0;32mdef\u001b[0m \u001b[0mwrapper\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 318\u001b[0;31m     \u001b[0;32mraise\u001b[0m \u001b[0mNotImplementedError\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"'%s' is not yet supported (%s)\"\u001b[0m \u001b[0;34m%\u001b[0m \u001b[0;34m(\u001b[0m\u001b[0mop\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mjira\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    319\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    320\u001b[0m   \u001b[0;32mreturn\u001b[0m \u001b[0mwrapper\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;31mNotImplementedError\u001b[0m: 'apply' is not yet supported (BEAM-9547)"
     ]
    }
   ],
   "source": [
    "query = \"\"\"\n",
    "SELECT  \n",
    "  date,\n",
    "  airline,\n",
    "  departure_airport,\n",
    "  arrival_airport,\n",
    "  departure_delay,\n",
    "  arrival_delay\n",
    "FROM `bigquery-samples.airline_ontime_data.flights`\n",
    "\"\"\"\n",
    "with beam.Pipeline() as p:\n",
    "    tbl = (p \n",
    "           | 'read table' >> beam.io.ReadFromBigQuery(query=query)\n",
    "           | 'assign ts' >> beam.Map(\n",
    "               lambda x: beam.window.TimestampedValue(x, to_unixtime(x['date'])))\n",
    "           | 'set schema' >> beam.Select(\n",
    "               date=lambda x: str(x['date']),\n",
    "               airline=lambda x: str(x['airline']),\n",
    "               departure_airport=lambda x: str(x['departure_airport']),\n",
    "               arrival_airport=lambda x: str(x['arrival_airport']),\n",
    "               departure_delay=lambda x: float(x['departure_delay']),\n",
    "               arrival_delay=lambda x: float(x['arrival_delay']))\n",
    "          )\n",
    "    daily = tbl | 'daily windows' >> beam.WindowInto(beam.window.FixedWindows(60*60*24))\n",
    "    # group the flights data by carrier\n",
    "    df = to_dataframe(daily)\n",
    "    grouped = df.groupby('airline')\n",
    "    #agg = to_pcollection(grouped.groups)\n",
    "    # get dataframes corresponding to each group, and apply our function to it\n",
    "    #result = agg | 'avg delays' >> beam.Map(lambda pc: to_dataframe(pc).get_delay_at_top_airports)\n",
    "    result = grouped.apply(get_delay_at_top_airports)\n",
    "    result.to_csv('output.csv')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Copyright 2020 Google Inc. Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License."
   ]
  }
 ],
 "metadata": {
  "environment": {
   "name": "tf2-2-3-gpu.2-3.m59",
   "type": "gcloud",
   "uri": "gcr.io/deeplearning-platform-release/tf2-2-3-gpu.2-3:m59"
  },
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
